package mqtt

import (
	"fmt"
	"sync"
	"time"

	MQTT "github.com/eclipse/paho.mqtt.golang"
)

type MessageHandler func(topic string, payload []byte)

type Config struct {
	UserName  string
	Password  string
	ClientId  string
	Brokers   []string
	StorePath string
	Qos       int32
}

var (
	mqttClient    MQTT.Client
	mqttSubTopics map[string]MQTT.MessageHandler
	mqttInitOnce  sync.Once
)

func resubscribeTopicOnConnect() {
	for topic := range mqttSubTopics {
		if mqttSubTopics[topic] == nil {
			continue
		}

		if token := mqttClient.Subscribe(topic, 0, mqttSubTopics[topic]); token.Wait() && token.Error() != nil {
			fmt.Println("Subscribe Error:", token.Error())
		}
	}
}

func onConnectHandler(client MQTT.Client) {
	resubscribeTopicOnConnect()
	fmt.Println("mqtt connect success")
}

func onConnectionLost(client MQTT.Client, err error) {
	fmt.Println("MQTT Connection lost:", err.Error())
}

func onReconnecting(client MQTT.Client, opt *MQTT.ClientOptions) {
	fmt.Println("mqtt reconnecting")
}

func ClientSetup(c *Config) error {
	var err error = nil
	mqttInitOnce.Do(func() {
		opts := MQTT.NewClientOptions()
		for _, broker := range c.Brokers {
			opts.AddBroker(broker)
		}
		opts.SetClientID(c.ClientId)
		opts.SetUsername(c.UserName)
		opts.SetPassword(c.Password)
		opts.SetAutoReconnect(true)
		opts.SetConnectRetry(true)
		opts.SetCleanSession(true)
		opts.OnConnect = onConnectHandler
		opts.OnConnectionLost = onConnectionLost
		opts.OnReconnecting = onReconnecting

		if len(c.StorePath) > 0 && c.StorePath != ":memory:" {
			opts.SetStore(MQTT.NewFileStore(c.StorePath))
		}

		// fmt.Println("Mqtt connecting", c.Broker)
		client := MQTT.NewClient(opts)
		if token := client.Connect(); token.Wait() && token.Error() != nil {
			err = token.Error()
			return
		}

		mqttClient = client
		mqttSubTopics = make(map[string]MQTT.MessageHandler)
	})

	return err
}

func PublishMsg(topic string, message []byte) error {
	token := mqttClient.Publish(topic, 0, false, message)
	if !token.WaitTimeout(time.Microsecond * 300) {
		return fmt.Errorf("publish fail")
	}

	return nil
}

func SubscribeTopic(topic string, callback MessageHandler) error {
	mqttSubTopics[topic] = func(client MQTT.Client, msg MQTT.Message) {
		topic := msg.Topic()
		callback(topic, msg.Payload())
	}

	if token := mqttClient.Subscribe(topic, 0, mqttSubTopics[topic]); token.Wait() && token.Error() != nil {
		return token.Error()
	}

	return nil
}

func UnsubscribeTopic(topic string) error {
	if token := mqttClient.Unsubscribe(topic); token.Wait() && token.Error() != nil {
		return token.Error()
	}

	delete(mqttSubTopics, topic)
	return nil
}
